[core] Share JDBC connection pool across catalog instances in same JVM#8323
Open
nickdelnano wants to merge 4 commits into
Open
[core] Share JDBC connection pool across catalog instances in same JVM#8323nickdelnano wants to merge 4 commits into
nickdelnano wants to merge 4 commits into
Conversation
Previously, every call to catalogLoader.load() created a new JdbcCatalog with its own dedicated JdbcClientPool (default 2 connections). In Flink CDC jobs, each operator subtask (parser, writer, committer) creates its own catalog, resulting in O(parallelism * operators * pool_size) JDBC connections per job. This mirrors the pattern used by HiveCatalog's CachedClientPool: a static Caffeine cache keyed on (JDBC URI, catalog-key) shares a single JdbcClientPool across all catalog instances in the same TaskManager JVM. Idle pools are evicted after a configurable interval (default 5 min).
Verifies that: - Same URI + catalog-key returns the same pool instance - Different URI returns a different pool instance - Different catalog-key returns a different pool instance - The shared pool is usable (connections work) - Multiple JdbcCatalog instances share the same underlying pool - resetCache() clears all cached pools
7a048ad to
2d7225b
Compare
Fixes 6 issues found in code review: 1. Pool evicted while active: Caffeine's expireAfterAccess would close the pool after 5 minutes even if the catalog was still using it, because the catalog stored the raw pool and never refreshed the cache access time. Fix: use ConcurrentHashMap with no eviction — pools live for the JVM lifetime. 2. Race in init(): synchronized on 'this' but guarding a static field. Fix: eliminated entirely — ConcurrentHashMap.computeIfAbsent is thread-safe. 3. Credentials not in cache key: catalogs with same URI but different users would share a pool. Fix: include user:password in the key. 4. First-initializer-wins for eviction config: no longer relevant since there's no eviction configuration. 5. close() was a no-op with no graceful shutdown: Fix: added a JVM shutdown hook that closes all pools. 6. No Caffeine Scheduler meant expired entries lingered: no longer relevant since we don't use Caffeine.
2d7225b to
a5731b8
Compare
When close() is called while a run() action is in-flight, the connection is returned to the deque after close() has already drained it. The connection is then orphaned and never closed. Fix: after returning the client to the deque, check if close() raced us (this.clients == null). If so, remove the client we just added and close it directly. Credit: extracted from apache#8268.
JingsongLi
reviewed
Jun 23, 2026
| this.dbUrl = options.get(URI); | ||
| this.poolSize = options.get(CLIENT_POOL_SIZE); | ||
| this.props = props; | ||
| this.key = Key.of(dbUrl, options.get(JdbcCatalogOptions.CATALOG_KEY)); |
Contributor
There was a problem hiding this comment.
This key does not include the JDBC connection properties used by JdbcClientPool (props is later passed to JdbcUtils.extractJdbcConfiguration, including jdbc.user / jdbc.password). If two catalogs in the same JVM use the same URI and catalog-key but different credentials, the second catalog will reuse the pool created by the first one and run with the wrong database user. Please include the effective JDBC connection properties in the cache key (and add a regression test with the same URI/catalog-key but different jdbc.user or jdbc.password).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
CachedJdbcClientPool, a static cache that sharesJdbcClientPoolinstances across allJdbcCataloginstances in the same JVM, keyed by(JDBC URI, catalog-key).CachedClientPoolfor HMS Thrift connections.O(parallelism × operators × pool_size)toO(TaskManagers × pool_size)in Flink CDC jobs.For high parallelism jobs this reduces jdbc connection count by 8x or more.
Motivation
In Flink CDC sync jobs, every operator subtask that calls
catalogLoader.load()creates a newJdbcCatalogwith its own dedicated connection pool (default size: 2). With parallelism 16, a single job creates ~128 JDBC connections across parser, schema evolution, writer, and committer operators. The HiveCatalog avoids this viaCachedClientPool— a static Caffeine cache that shares a single HMS client pool across all catalog instances in the same JVM.This PR applies this pattern to
JdbcCatalog.Design Decision: ConcurrentHashMap vs Caffeine
HiveCatalog uses Caffeine with
expireAfterAccessand avoids premature eviction by never exposing the raw pool —CachedClientPoolimplementsClientPooland delegates everyrun()call throughclientPoolCache.get(key), which refreshes the access timer on each operation.Using Caffeine for JDBC would require a larger change because all 16 methods in
JdbcUtilsare typed to acceptJdbcClientPool, not theClientPoolinterface. If using Caffeine is preferred I will do that instead. I have started with the simpler change. Please let me know.Instead we use a
ConcurrentHashMapwith no time-based eviction. Pools live for the JVM lifetime and are closed via a shutdown hook. This is safe because:Changes
CachedJdbcClientPool(new): StaticConcurrentHashMap<Key, JdbcClientPool>keyed on(URI, catalog-key). JVM shutdown hook closes all pools.JdbcCatalog: UsesCachedJdbcClientPool.get()instead of creating a dedicated pool.close()no longer closes the shared pool.JdbcCatalogLockContext: Same — uses the shared cached pool.Test plan
JdbcCatalogTest(70 tests) — passesJdbcClientPoolTest(4 tests) — passesCachedJdbcClientPoolTest(6 tests) — new tests for pool sharing semantics